import json
import logging
from datetime import datetime, timezone
from threading import Thread, enumerate as enumerate_threads
from time import sleep, time
from typing import List, Union, Optional, Callable, Sequence, Dict, Any

from attr import attrs, attrib
from dateutil.relativedelta import relativedelta

from .controller import PipelineController
from .job import ClearmlJob
from ..backend_interface.util import (
    datetime_from_isoformat,
    datetime_to_isoformat,
    mutually_exclusive,
)
from ..task import Task


@attrs
class BaseScheduleJob(object):
    name = attrib(type=str, default=None)
    base_task_id = attrib(type=str, default=None)
    base_function = attrib(type=Callable, default=None)
    queue = attrib(type=str, default=None)
    target_project = attrib(type=str, default=None)
    single_instance = attrib(type=bool, default=False)
    task_parameters = attrib(type=dict, default={})
    task_overrides = attrib(type=dict, default={})
    clone_task = attrib(type=bool, default=True)
    _executed_instances = attrib(type=list, default=None)

    def to_dict(self, full: bool = False) -> Dict[str, Any]:
        return {k: v for k, v in self.__dict__.items() if not callable(v) and (full or not str(k).startswith("_"))}

    def update(self, a_job: Union[Dict, "BaseScheduleJob"]) -> "BaseScheduleJob":
        converters = {a.name: a.converter for a in getattr(self, "__attrs_attrs__", [])}
        for k, v in (a_job.to_dict(full=True) if not isinstance(a_job, dict) else a_job).items():
            if v is not None and not callable(getattr(self, k, v)):
                setattr(self, k, converters[k](v) if converters.get(k) else v)
        return self

    def verify(self) -> None:
        if self.base_function and not self.name:
            raise ValueError("Entry 'name' must be supplied for function scheduling")
        if self.base_task_id and not self.queue:
            raise ValueError("Target 'queue' must be provided for function scheduling")
        if not self.base_function and not self.base_task_id:
            raise ValueError("Either schedule function or task-id must be provided")

    def get_last_executed_task_id(self) -> Optional[str]:
        return self._executed_instances[-1] if self._executed_instances else None

    def run(self, task_id: Optional[str]) -> None:
        if task_id:
            # make sure we have a new instance
            if not self._executed_instances:
                self._executed_instances = []
            self._executed_instances.append(str(task_id))

    def get_resolved_target_project(self) -> Optional[str]:
        if not self.base_task_id or not self.target_project:
            return self.target_project
        # noinspection PyBroadException
        try:
            task = Task.get_task(task_id=self.base_task_id)
            # noinspection PyProtectedMember
            if (
                PipelineController._tag in task.get_system_tags()
                and "/{}/".format(PipelineController._project_section) not in self.target_project
            ):
                # noinspection PyProtectedMember
                return "{}/{}/{}".format(self.target_project, PipelineController._project_section, task.name)
        except Exception:
            pass
        return self.target_project


@attrs
class ScheduleJob(BaseScheduleJob):
    _weekdays_ind = (
        "monday",
        "tuesday",
        "wednesday",
        "thursday",
        "friday",
        "saturday",
        "sunday",
    )

    execution_limit_hours = attrib(type=float, default=None)
    recurring = attrib(type=bool, default=True)
    starting_time = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
    minute = attrib(type=float, default=None)
    hour = attrib(type=float, default=None)
    day = attrib(default=None)
    weekdays = attrib(default=None)
    month = attrib(type=float, default=None)
    year = attrib(type=float, default=None)
    _next_run = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
    _execution_timeout = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
    _last_executed = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
    _schedule_counter = attrib(type=int, default=0)

    def verify(self) -> None:
        def check_integer(value: Union[int, float, str, None]) -> bool:
            try:
                return False if not isinstance(value, (int, float)) or int(value) != float(value) else True
            except (TypeError, ValueError):
                return False

        super(ScheduleJob, self).verify()
        if self.weekdays and self.day not in (None, 0, 1):
            raise ValueError("`weekdays` and `day` combination is not valid (day must be None,0 or 1)")
        if self.weekdays and any(w not in self._weekdays_ind for w in self.weekdays):
            raise ValueError("`weekdays` must be a list of strings, valid values are: {}".format(self._weekdays_ind))
        if not (self.minute or self.hour or self.day or self.month or self.year):
            raise ValueError("Schedule time/date was not provided")
        if self.minute and not check_integer(self.minute):
            raise ValueError("Schedule `minute` must be an integer")
        if self.hour and not check_integer(self.hour):
            raise ValueError("Schedule `hour` must be an integer")
        if self.day and not check_integer(self.day):
            raise ValueError("Schedule `day` must be an integer")
        if self.month and not check_integer(self.month):
            raise ValueError("Schedule `month` must be an integer")
        if self.year and not check_integer(self.year):
            raise ValueError("Schedule `year` must be an integer")

    def next_run(self) -> Optional[datetime]:
        return self._next_run

    def get_execution_timeout(self) -> Optional[datetime]:
        return self._execution_timeout

    def next(self) -> Optional[datetime]:
        """
        :return: Return the next run datetime, None if no scheduling needed
        """
        if not self.recurring and self._last_executed:
            self._next_run = None
            return self._next_run

        # make sure we have a starting time
        if not self.starting_time:
            self.starting_time = datetime.now(timezone.utc)

        # check if we have a specific date
        if self.year and self.year > 2000:
            # this is by definition a single execution only
            if self._last_executed:
                return None

            self._next_run = datetime(
                year=int(self.year),
                month=int(self.month or 1),
                day=int(self.day or 1),
                hour=int(self.hour or 0),
                minute=int(self.minute or 0),
            )
            if self.weekdays:
                self._next_run += relativedelta(weekday=self.get_weekday_ord(self.weekdays[0]))

            return self._next_run

        # check if we have a specific day of the week
        weekday = None
        if self.weekdays:
            # get previous weekday
            _weekdays = [self.get_weekday_ord(w) for w in self.weekdays]
            try:
                prev_weekday_ind = _weekdays.index(self._last_executed.weekday()) if self._last_executed else -1
            except ValueError:
                # in case previous execution was not in the weekday (for example executed immediately at scheduling)
                prev_weekday_ind = -1
            weekday = _weekdays[(prev_weekday_ind + 1) % len(_weekdays)]

        prev_timestamp = self._last_executed or self.starting_time
        # fix first scheduled job should be as close as possible to starting time
        if self._schedule_counter < 1:
            # we should get here the first time we need to schedule a job, after that the delta is fixed
            # If we have execute_immediately we need to get here after the first execution
            # (so even through we have self._last_executed)

            # if this is a daily schedule and we can still run it today, then we should
            run0 = self._calc_next_run(self.starting_time, weekday)
            run1 = self._calc_next_run(run0, weekday)
            delta = run1 - run0
            optional_first_timestamp = self._calc_next_run(prev_timestamp - delta, weekday)
            if optional_first_timestamp > prev_timestamp:
                # this is us, we can still run it
                self._next_run = optional_first_timestamp
                return self._next_run

        self._next_run = self._calc_next_run(prev_timestamp, weekday)
        return self._next_run

    def _calc_next_run(self, prev_timestamp: datetime, weekday: Optional[int]) -> datetime:
        # make sure that if we have a specific day we zero the minutes/hours/seconds
        if self.year:
            prev_timestamp = datetime(
                year=prev_timestamp.year,
                month=self.month or prev_timestamp.month,
                day=self.day or 1,
            )
        elif self.month:
            prev_timestamp = datetime(
                year=prev_timestamp.year,
                month=prev_timestamp.month,
                day=self.day or 1,
            )
        elif self.day is None and weekday is not None:
            # notice we assume every X hours on specific weekdays
            # other combinations (i.e. specific time at weekdays, is covered later)
            next_timestamp = datetime(
                year=prev_timestamp.year,
                month=prev_timestamp.month,
                day=prev_timestamp.day,
                hour=prev_timestamp.hour,
                minute=prev_timestamp.minute,
            )
            next_timestamp += relativedelta(
                years=self.year or 0,
                months=0 if self.year else (self.month or 0),
                hours=self.hour or 0,
                minutes=self.minute or 0,
                weekday=weekday if not self._last_executed else None,
            )
            # start a new day
            if next_timestamp.day != prev_timestamp.day:
                next_timestamp = datetime(
                    year=prev_timestamp.year,
                    month=prev_timestamp.month,
                    day=prev_timestamp.day,
                ) + relativedelta(
                    years=self.year or 0,
                    months=0 if self.year else (self.month or 0),
                    hours=self.hour or 0,
                    minutes=self.minute or 0,
                    weekday=weekday,
                )

            return next_timestamp

        elif self.day is not None and weekday is not None:
            # push to the next day (so we only have once a day)
            prev_timestamp = datetime(
                year=prev_timestamp.year,
                month=prev_timestamp.month,
                day=prev_timestamp.day,
            ) + relativedelta(days=1)
        elif self.day:
            # reset minutes in the hour (we will be adding additional hour/minute anyhow)
            prev_timestamp = datetime(
                year=prev_timestamp.year,
                month=prev_timestamp.month,
                day=prev_timestamp.day,
            )
        elif self.hour:
            # reset minutes in the hour (we will be adding additional minutes anyhow)
            prev_timestamp = datetime(
                year=prev_timestamp.year,
                month=prev_timestamp.month,
                day=prev_timestamp.day,
                hour=prev_timestamp.hour,
            )

        return prev_timestamp + relativedelta(
            years=self.year or 0,
            months=0 if self.year else (self.month or 0),
            days=0 if self.month or self.year else ((self.day or 0) if weekday is None else 0),
            hours=self.hour or 0,
            minutes=self.minute or 0,
            weekday=weekday,
        )

    def run(self, task_id: Optional[str]) -> datetime:
        super(ScheduleJob, self).run(task_id)
        if self._last_executed or self.starting_time != datetime.fromtimestamp(0):
            self._schedule_counter += 1

        self._last_executed = datetime.now(timezone.utc)
        if self.execution_limit_hours and task_id:
            self._execution_timeout = self._last_executed + relativedelta(
                hours=int(self.execution_limit_hours),
                minutes=int((self.execution_limit_hours - int(self.execution_limit_hours)) * 60),
            )
        else:
            self._execution_timeout = None
        return self._last_executed

    @classmethod
    def get_weekday_ord(cls, weekday: Union[int, str]) -> int:
        if isinstance(weekday, int):
            return min(6, max(weekday, 0))
        return cls._weekdays_ind.index(weekday)


@attrs
class ExecutedJob(object):
    name = attrib(type=str, default=None)
    started = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
    finished = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
    task_id = attrib(type=str, default=None)
    thread_id = attrib(type=str, default=None)

    def to_dict(self, full: bool = False) -> Dict[str, Any]:
        return {k: v for k, v in self.__dict__.items() if full or not str(k).startswith("_")}


class BaseScheduler(object):
    def __init__(
        self,
        sync_frequency_minutes: float = 15,
        force_create_task_name: Optional[str] = None,
        force_create_task_project: Optional[str] = None,
        pooling_frequency_minutes: Optional[float] = None,
    ) -> None:
        """
        Create a Task scheduler service

        :param sync_frequency_minutes: Sync task scheduler configuration every X minutes.
        Allow to change scheduler in runtime by editing the Task configuration object
        :param force_create_task_name: Optional, force creation of Task Scheduler service,
        even if main Task.init already exists.
        :param force_create_task_project: Optional, force creation of Task Scheduler service,
        even if main Task.init already exists.
        """
        self._last_sync = 0
        self._pooling_frequency_minutes = pooling_frequency_minutes
        self._sync_frequency_minutes = sync_frequency_minutes
        if force_create_task_name or not Task.current_task():
            self._task = Task.init(
                project_name=force_create_task_project or "DevOps",
                task_name=force_create_task_name or "Scheduler",
                task_type=Task.TaskTypes.service,
                auto_resource_monitoring=False,
            )
        else:
            self._task = Task.current_task()

    def start(self) -> None:
        """
        Start the Task TaskScheduler loop (notice this function does not return)
        """
        if Task.running_locally():
            self._serialize_state()
            self._serialize()
        else:
            self._deserialize_state()
            self._deserialize()

        while True:
            # sync with backend
            try:
                if time() - self._last_sync > 60.0 * self._sync_frequency_minutes:
                    self._last_sync = time()
                    self._deserialize()
                    self._update_execution_plots()
            except Exception as ex:
                self._log("Warning: Exception caught during deserialization: {}".format(ex))
                self._last_sync = time()

            try:
                if self._step():
                    self._serialize_state()
                    self._update_execution_plots()
            except Exception as ex:
                self._log("Warning: Exception caught during scheduling step: {}".format(ex))
                # rate control
                sleep(15)

            # sleep until the next pool (default None)
            if self._pooling_frequency_minutes:
                self._log("Sleeping until the next pool in {} minutes".format(self._pooling_frequency_minutes))
                sleep(self._pooling_frequency_minutes * 60.0)

    def start_remotely(self, queue: str = "services") -> None:
        """
        Start the Task TaskScheduler loop (notice this function does not return)

        :param queue: Remote queue to run the scheduler on, default 'services' queue.
        """
        # make sure we serialize the current state if we are running locally
        if self._task.running_locally():
            self._serialize_state()
            self._serialize()
        # launch on the remote agent
        self._task.execute_remotely(queue_name=queue, exit_process=True)
        # we will be deserializing the state inside `start`
        self.start()

    def _update_execution_plots(self) -> None:
        """
        Update the configuration and execution table plots
        """
        pass

    def _serialize(self) -> None:
        """
        Serialize Task scheduling configuration only (no internal state)
        """
        pass

    def _serialize_state(self) -> None:
        """
        Serialize internal state only
        """
        pass

    def _deserialize_state(self) -> None:
        """
        Deserialize internal state only
        """
        pass

    def _deserialize(self) -> None:
        """
        Deserialize Task scheduling configuration only
        """
        pass

    def _step(self) -> bool:
        """
        scheduling processing step. Return True if a new Task was scheduled.
        """
        pass

    def _log(self, message: str, level: int = logging.INFO) -> None:
        if self._task:
            self._task.get_logger().report_text(message, level=level)
        else:
            print(message)

    def _launch_job(self, job: ScheduleJob) -> None:
        self._launch_job_task(job)
        self._launch_job_function(job)

    def _launch_job_task(
        self,
        job: BaseScheduleJob,
        task_parameters: Optional[dict] = None,
        add_tags: Optional[List[str]] = None,
    ) -> Optional[ClearmlJob]:
        # make sure this is not a function job
        if job.base_function:
            return None

        # check if this is a single instance, then we need to abort the Task
        if job.single_instance and job.get_last_executed_task_id():
            t = Task.get_task(task_id=job.get_last_executed_task_id())
            if t.status in ("in_progress", "queued"):
                self._log("Skipping Task {} scheduling, previous Task instance {} still running".format(job.name, t.id))
                job.run(None)
                return None

        # actually run the job
        task_job = ClearmlJob(
            base_task_id=job.base_task_id,
            parameter_override=task_parameters or job.task_parameters,
            task_overrides=job.task_overrides,
            disable_clone_task=not job.clone_task,
            allow_caching=False,
            target_project=job.get_resolved_target_project(),
            tags=[add_tags] if add_tags and isinstance(add_tags, str) else add_tags,
        )
        self._log("Scheduling Job {}, Task {} on queue {}.".format(job.name, task_job.task_id(), job.queue))
        if task_job.launch(queue_name=job.queue):
            # mark as run
            job.run(task_job.task_id())
        return task_job

    def _launch_job_function(self, job: BaseScheduleJob, func_args: Optional[Sequence] = None) -> Optional[Thread]:
        # make sure this IS a function job
        if not job.base_function:
            return None

        # check if this is a single instance, then we need to abort the Task
        if job.single_instance and job.get_last_executed_task_id():
            # noinspection PyBroadException
            try:
                a_thread = [t for t in enumerate_threads() if t.ident == job.get_last_executed_task_id()]
                if a_thread:
                    a_thread = a_thread[0]
            except Exception:
                a_thread = None

            if a_thread and a_thread.is_alive():
                self._log(
                    "Skipping Task '{}' scheduling, previous Thread instance '{}' still running".format(
                        job.name, a_thread.ident
                    )
                )
                job.run(None)
                return None

        self._log("Scheduling Job '{}', Task '{}' on background thread".format(job.name, job.base_function))
        t = Thread(target=job.base_function, args=func_args or ())
        t.start()
        # mark as run
        job.run(t.ident)
        return t

    @staticmethod
    def _cancel_task(task_id: str) -> ():
        if not task_id:
            return
        t = Task.get_task(task_id=task_id)
        status = t.status
        if status in ("in_progress",):
            t.stopped(force=True)
        elif status in ("queued",):
            Task.dequeue(t)


class TaskScheduler(BaseScheduler):
    """
    Task Scheduling controller.
    Notice time-zone is ALWAYS UTC
    """

    _configuration_section = "schedule"

    def __init__(
        self,
        sync_frequency_minutes: float = 15,
        force_create_task_name: Optional[str] = None,
        force_create_task_project: Optional[str] = None,
    ) -> None:
        """
        Create a Task scheduler service

        :param sync_frequency_minutes: Sync task scheduler configuration every X minutes.
            Allow to change scheduler in runtime by editing the Task configuration object
        :param force_create_task_name: Optional, force creation of Task Scheduler service,
            even if main Task.init already exists.
        :param force_create_task_project: Optional, force creation of Task Scheduler service,
            even if main Task.init already exists.
        """
        super(TaskScheduler, self).__init__(
            sync_frequency_minutes=sync_frequency_minutes,
            force_create_task_name=force_create_task_name,
            force_create_task_project=force_create_task_project,
        )
        self._schedule_jobs = []  # List[ScheduleJob]
        self._timeout_jobs = {}  # Dict[datetime, str]
        self._executed_jobs = []  # List[ExecutedJob]

    def add_task(
        self,
        schedule_task_id: Union[str, Task] = None,
        schedule_function: Callable = None,
        queue: str = None,
        name: Optional[str] = None,
        target_project: Optional[str] = None,
        minute: Optional[int] = None,
        hour: Optional[int] = None,
        day: Optional[int] = None,
        weekdays: Optional[List[str]] = None,
        month: Optional[int] = None,
        year: Optional[int] = None,
        limit_execution_time: Optional[float] = None,
        single_instance: bool = False,
        recurring: bool = True,
        execute_immediately: bool = False,
        reuse_task: bool = False,
        task_parameters: Optional[dict] = None,
        task_overrides: Optional[dict] = None,
    ) -> bool:
        """
        Create a cron job-like scheduling for a pre-existing Task.
        Notice, it is recommended to give the schedule entry a descriptive unique name,
        if not provided, a name is randomly generated.

        When timespec parameters are specified exclusively, they define the time between task launches (see
        `year` and `weekdays` exceptions). When multiple timespec parameters are specified, the parameter representing
        the longest duration defines the time between task launches, and the shorter timespec parameters define specific
        times.

        Examples:

        Launch every 15 minutes:

        .. code-block:: py

            add_task(schedule_task_id='1235', queue='default', minute=15)

        Launch every 1 hour:

        .. code-block:: py

            add_task(schedule_task_id='1235', queue='default', hour=1)

        Launch every 1 hour at hour:30 minutes (i.e. 1:30, 2:30 etc.):

        .. code-block:: py

            add_task(schedule_task_id='1235', queue='default', hour=1, minute=30)

        Launch every day at 22:30 (10:30 pm):

        .. code-block:: py

            add_task(schedule_task_id='1235', queue='default', minute=30, hour=22, day=1)

        Launch every other day at 7:30 (7:30 am):

        .. code-block:: py

            add_task(schedule_task_id='1235', queue='default', minute=30, hour=7, day=2)

        Launch every Saturday at 8:30am (notice `day=0`):

        .. code-block:: py

            add_task(schedule_task_id='1235', queue='default', minute=30, hour=8, day=0, weekdays=['saturday'])

        Launch every 2 hours on the weekends Saturday/Sunday (notice `day` is not passed):

        .. code-block:: py

            add_task(schedule_task_id='1235', queue='default', hour=2, weekdays=['saturday', 'sunday'])

        Launch once a month at the 5th of each month:

        .. code-block:: py

            add_task(schedule_task_id='1235', queue='default', month=1, day=5)

        Launch once a year on March 4th:

        .. code-block:: py

            add_task(schedule_task_id='1235', queue='default', year=1, month=3, day=4)

        :param schedule_task_id: ID of Task to be cloned and scheduled for execution
        :param schedule_function: Optional, instead of providing Task ID to be scheduled,
            provide a function to be called. Notice the function is called from the scheduler context
            (i.e. running on the same machine as the scheduler)
        :param queue: Name or ID of queue to put the Task into (i.e. schedule)
        :param name: Name or description for the cron Task (should be unique if provided, otherwise randomly generated)
        :param target_project: Specify target project to put the cloned scheduled Task in.
        :param minute: Time (in minutes) between task launches. If specified together with `hour`, `day`, `month`,
            and / or  `year`, it defines the minute of the hour
        :param hour: Time (in hours) between task launches. If specified together with `day`, `month`, and / or
            `year`, it defines the hour of day.
        :param day: Time (in days) between task executions. If specified together with  `month` and / or `year`,
            it defines the day of month
        :param weekdays: Days of week to launch task (accepted inputs: 'monday', 'tuesday', 'wednesday',
            'thursday', 'friday', 'saturday', 'sunday')
        :param month: Time (in months) between task launches. If specified with `year`, it defines a specific month
        :param year: Specific year if value >= current year. Time (in years) between task launches if
            value <= 100
        :param limit_execution_time: Limit the execution time (in hours) of the specific job.
        :param single_instance: If True, do not launch the Task job if the previous instance is still running
            (skip until the next scheduled time period). Default False.
        :param recurring: If False, only launch the Task once (default: True, repeat)
        :param execute_immediately: If True, schedule the Task to be executed immediately
            then recurring based on the timing schedule arguments. Default False.
        :param reuse_task: If True, re-enqueue the same Task (i.e. do not clone it) every time, default False.
        :param task_parameters: Configuration parameters to the executed Task.
            for example: ``{'Args/batch': '12'}`` Notice: not available when reuse_task=True
        :param task_overrides: Change task definition.
            for example ``{'script.version_num': None, 'script.branch': 'main'}`` Notice: not available when reuse_task=True

        :return: True if job is successfully added to the scheduling list
        """
        mutually_exclusive(schedule_function=schedule_function, schedule_task_id=schedule_task_id)
        task_id = schedule_task_id.id if isinstance(schedule_task_id, Task) else str(schedule_task_id or "")

        # noinspection PyProtectedMember
        job = ScheduleJob(
            name=name or task_id,
            base_task_id=task_id,
            base_function=schedule_function,
            queue=queue,
            target_project=target_project,
            execution_limit_hours=limit_execution_time,
            recurring=bool(recurring),
            single_instance=bool(single_instance),
            task_parameters=task_parameters,
            task_overrides=task_overrides,
            clone_task=not bool(reuse_task),
            starting_time=datetime.fromtimestamp(0) if execute_immediately else datetime.now(timezone.utc),
            minute=minute,
            hour=hour,
            day=day,
            weekdays=weekdays,
            month=month,
            year=year,
        )
        # raise exception if not valid
        job.verify()

        self._schedule_jobs.append(job)
        return True

    def get_scheduled_tasks(self) -> List[ScheduleJob]:
        """
        Return the current set of scheduled jobs

        :return: List of ScheduleJob instances
        """
        return self._schedule_jobs

    def remove_task(self, task_id: Union[str, Task, Callable]) -> bool:
        """
        Remove a Task ID from the scheduled task list.

        :param task_id: Task or Task ID to be removed
        :return: return True of the Task ID was found in the scheduled jobs list and was removed.
        """
        if isinstance(task_id, (Task, str)):
            task_id = task_id.id if isinstance(task_id, Task) else str(task_id)
            if not any(t.base_task_id == task_id for t in self._schedule_jobs):
                return False
            self._schedule_jobs = [t for t in self._schedule_jobs if t.base_task_id != task_id]
        else:
            if not any(t.base_function == task_id for t in self._schedule_jobs):
                return False
            self._schedule_jobs = [t for t in self._schedule_jobs if t.base_function != task_id]
        return True

    def start(self) -> None:
        """
        Start the Task TaskScheduler loop (notice this function does not return)
        """
        super(TaskScheduler, self).start()

    def _step(self) -> bool:
        """
        scheduling processing step
        """
        # update next execution datetime
        for j in self._schedule_jobs:
            j.next()

        # get idle timeout (aka sleeping)
        scheduled_jobs = sorted(
            [j for j in self._schedule_jobs if j.next_run() is not None],
            key=lambda x: x.next_run(),
        )
        # sort by key
        timeout_job_datetime = min(self._timeout_jobs, key=self._timeout_jobs.get) if self._timeout_jobs else None
        if not scheduled_jobs and timeout_job_datetime is None:
            # sleep and retry
            seconds = 60.0 * self._sync_frequency_minutes
            self._log("Nothing to do, sleeping for {:.2f} minutes.".format(seconds / 60.0))
            sleep(seconds)
            return False

        next_time_stamp = scheduled_jobs[0].next_run() if scheduled_jobs else None
        if timeout_job_datetime is not None:
            next_time_stamp = min(next_time_stamp, timeout_job_datetime) if next_time_stamp else timeout_job_datetime

        sleep_time = (next_time_stamp - datetime.now(timezone.utc)).total_seconds()
        if sleep_time > 0:
            # sleep until we need to run a job or maximum sleep time
            seconds = min(sleep_time, 60.0 * self._sync_frequency_minutes)
            self._log(
                "Waiting for next run [UTC {}], sleeping for {:.2f} minutes, until next sync.".format(
                    next_time_stamp, seconds / 60.0
                )
            )
            sleep(seconds)
            return False

        # check if this is a Task timeout check
        if timeout_job_datetime is not None and next_time_stamp == timeout_job_datetime:
            task_id = self._timeout_jobs[timeout_job_datetime]
            self._log("Aborting job due to timeout: {}".format(task_id))
            self._cancel_task(task_id=task_id)
            self._timeout_jobs.pop(timeout_job_datetime, None)
        else:
            self._log("Launching job: {}".format(scheduled_jobs[0]))
            self._launch_job(scheduled_jobs[0])

        return True

    def start_remotely(self, queue: str = "services") -> None:
        """
        Start the Task TaskScheduler loop (notice this function does not return)

        :param queue: Remote queue to run the scheduler on, default 'services' queue.
        """
        super(TaskScheduler, self).start_remotely(queue=queue)

    def _serialize(self) -> None:
        """
        Serialize Task scheduling configuration only (no internal state)
        """
        # noinspection PyProtectedMember
        self._task._set_configuration(
            config_type="json",
            description="schedule tasks configuration",
            config_text=self._serialize_schedule_into_string(),
            name=self._configuration_section,
        )

    def _serialize_state(self) -> None:
        """
        Serialize internal state only
        """
        json_str = json.dumps(
            dict(
                scheduled_jobs=[j.to_dict(full=True) for j in self._schedule_jobs],
                timeout_jobs={datetime_to_isoformat(k): v for k, v in self._timeout_jobs.items()},
                executed_jobs=[j.to_dict(full=True) for j in self._executed_jobs],
            ),
            default=datetime_to_isoformat,
        )
        self._task.upload_artifact(name="state", artifact_object=json_str, preview="scheduler internal state")

    def _deserialize_state(self) -> None:
        """
        Deserialize internal state only
        """
        # get artifact
        self._task.reload()
        artifact_object = self._task.artifacts.get("state")
        if artifact_object is not None:
            state_json_str = artifact_object.get(force_download=True)
            if state_json_str is not None:
                state_dict = json.loads(state_json_str)
                self._schedule_jobs = self.__deserialize_scheduled_jobs(
                    serialized_jobs_dicts=state_dict.get("scheduled_jobs", []),
                    current_jobs=self._schedule_jobs,
                )
                self._timeout_jobs = {datetime_from_isoformat(k): v for k, v in (state_dict.get("timeout_jobs") or {})}
                self._executed_jobs = [ExecutedJob(**j) for j in state_dict.get("executed_jobs", [])]

    def _deserialize(self) -> None:
        """
        Deserialize Task scheduling configuration only
        """
        self._log("Syncing scheduler")
        self._task.reload()
        # noinspection PyProtectedMember
        json_str = self._task._get_configuration_text(name=self._configuration_section)
        try:
            self._schedule_jobs = self.__deserialize_scheduled_jobs(
                serialized_jobs_dicts=json.loads(json_str),
                current_jobs=self._schedule_jobs,
            )
        except Exception as ex:
            self._log("Failed deserializing configuration: {}".format(ex), level=logging.WARN)
            return

    @staticmethod
    def __deserialize_scheduled_jobs(
        serialized_jobs_dicts: List[Dict], current_jobs: List[ScheduleJob]
    ) -> List[ScheduleJob]:
        scheduled_jobs = [ScheduleJob().update(j) for j in serialized_jobs_dicts]
        scheduled_jobs = {j.name: j for j in scheduled_jobs}
        current_scheduled_jobs = {j.name: j for j in current_jobs}

        # select only valid jobs, and update the valid ones state from the current one
        new_scheduled_jobs = [
            current_scheduled_jobs[name].update(j) if name in current_scheduled_jobs else j
            for name, j in scheduled_jobs.items()
        ]
        # verify all jobs
        for j in new_scheduled_jobs:
            j.verify()

        return new_scheduled_jobs

    def _serialize_schedule_into_string(self) -> str:
        return json.dumps([j.to_dict() for j in self._schedule_jobs], default=datetime_to_isoformat)

    def _update_execution_plots(self) -> None:
        """
        Update the configuration and execution table plots
        """
        if not self._task:
            return

        task_link_template = (
            self._task.get_output_log_web_page()
            .replace("/{}/".format(self._task.project), "/{project}/")
            .replace("/{}/".format(self._task.id), "/{task}/")
        )

        # plot the schedule definition
        columns = [
            "name",
            "base_task_id",
            "base_function",
            "next_run",
            "target_project",
            "queue",
            "minute",
            "hour",
            "day",
            "month",
            "year",
            "starting_time",
            "execution_limit_hours",
            "recurring",
            "single_instance",
            "task_parameters",
            "task_overrides",
            "clone_task",
        ]
        scheduler_table = [columns]
        for j in self._schedule_jobs:
            j_dict = j.to_dict()
            j_dict["next_run"] = j.next()
            j_dict["base_function"] = (
                "{}.{}".format(
                    getattr(j.base_function, "__module__", ""),
                    getattr(j.base_function, "__name__", ""),
                )
                if j.base_function
                else ""
            )

            if not j_dict.get("base_task_id"):
                j_dict["clone_task"] = ""

            row = [
                str(j_dict.get(c)).split(".", 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c) or "")
                for c in columns
            ]
            if row[1]:
                row[1] = '<a href="{}">{}</a>'.format(task_link_template.format(project="*", task=row[1]), row[1])
            scheduler_table += [row]

        # plot the already executed Tasks
        executed_table = [["name", "task id", "started", "finished"]]
        for executed_job in sorted(self._executed_jobs, key=lambda x: x.started, reverse=True):
            if not executed_job.finished:
                if executed_job.task_id:
                    t = Task.get_task(task_id=executed_job.task_id)
                    if t.status not in ("in_progress", "queued"):
                        executed_job.finished = t.data.completed or datetime.now(timezone.utc)
                elif executed_job.thread_id:
                    # noinspection PyBroadException
                    try:
                        a_thread = [t for t in enumerate_threads() if t.ident == executed_job.thread_id]
                        if not a_thread or not a_thread[0].is_alive():
                            executed_job.finished = datetime.now(timezone.utc)
                    except Exception:
                        pass

            executed_table += [
                [
                    executed_job.name,
                    '<a href="{}">{}</a>'.format(
                        task_link_template.format(project="*", task=executed_job.task_id),
                        executed_job.task_id,
                    )
                    if executed_job.task_id
                    else "function",
                    str(executed_job.started).split(".", 1)[0],
                    str(executed_job.finished).split(".", 1)[0],
                ]
            ]

        self._task.get_logger().report_table(
            title="Schedule Tasks", series=" ", iteration=0, table_plot=scheduler_table
        )
        self._task.get_logger().report_table(title="Executed Tasks", series=" ", iteration=0, table_plot=executed_table)

    def _launch_job_task(
        self,
        job: ScheduleJob,
        task_parameters: Optional[dict] = None,
        add_tags: Optional[List[str]] = None,
    ) -> Optional[ClearmlJob]:
        task_job = super(TaskScheduler, self)._launch_job_task(job, task_parameters=task_parameters, add_tags=add_tags)
        # make sure this is not a function job
        if task_job:
            self._executed_jobs.append(
                ExecutedJob(name=job.name, task_id=task_job.task_id(), started=datetime.now(timezone.utc))
            )
            # add timeout check
            if job.get_execution_timeout():
                # we should probably make sure we are not overwriting a Task
                self._timeout_jobs[job.get_execution_timeout()] = task_job.task_id()
        return task_job

    def _launch_job_function(self, job: ScheduleJob, func_args: Optional[Sequence] = None) -> Optional[Thread]:
        thread_job = super(TaskScheduler, self)._launch_job_function(job, func_args=func_args)
        # make sure this is not a function job
        if thread_job:
            self._executed_jobs.append(
                ExecutedJob(
                    name=job.name,
                    thread_id=str(thread_job.ident),
                    started=datetime.now(timezone.utc),
                )
            )
            # execution timeout is not supported with function callbacks.

        return thread_job
